/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hbase.zookeeper;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;

/**
 * A very simple read only zookeeper implementation without watcher support.
 */
@InterfaceAudience.Private
public final class ReadOnlyZKClient implements Closeable {

    private static final Logger LOG = LoggerFactory.getLogger(ReadOnlyZKClient.class);

    public static final String RECOVERY_RETRY = "zookeeper.recovery.retry";

    private static final int DEFAULT_RECOVERY_RETRY = 30;

    public static final String RECOVERY_RETRY_INTERVAL_MILLIS = "zookeeper.recovery.retry.intervalmill";

    private static final int DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS = 1000;

    public static final String KEEPALIVE_MILLIS = "zookeeper.keep-alive.time";

    private static final int DEFAULT_KEEPALIVE_MILLIS = 60000;

    private static final EnumSet<Code> FAIL_FAST_CODES = EnumSet.of(Code.NOAUTH, Code.AUTHFAILED);

    private final String connectString;

    private final int sessionTimeoutMs;

    private final int maxRetries;

    private final int retryIntervalMs;

    private final int keepAliveTimeMs;

    private static abstract class Task implements Delayed {

        protected long time = System.nanoTime();

        public boolean needZk() {
            return false;
        }

        public void exec(ZooKeeper zk) {
        }

        public void connectFailed(IOException e) {
        }

        public void closed(IOException e) {
        }

        @Override
        public int compareTo(Delayed o) {
            Task that = (Task) o;
            int c = Long.compare(time, that.time);
            if(c != 0) {
                return c;
            }
            return Integer.compare(System.identityHashCode(this), System.identityHashCode(that));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(time - System.nanoTime(), TimeUnit.NANOSECONDS);
        }
    }

    private static final Task CLOSE = new Task() {
    };

    private final DelayQueue<Task> tasks = new DelayQueue<>();

    private final AtomicBoolean closed = new AtomicBoolean(false);

    @VisibleForTesting
    ZooKeeper zookeeper;

    private int pendingRequests = 0;

    private String getId() {
        return String.format("0x%08x", System.identityHashCode(this));
    }

    public ReadOnlyZKClient(Configuration conf) {
        // We might use a different ZK for client access
        String clientZkQuorumServers = ZKConfig.getClientZKQuorumServersString(conf);
        if(clientZkQuorumServers != null) {
            this.connectString = clientZkQuorumServers;
        } else {
            this.connectString = ZKConfig.getZKQuorumServersString(conf);
        }
        this.sessionTimeoutMs = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
        this.maxRetries = conf.getInt(RECOVERY_RETRY, DEFAULT_RECOVERY_RETRY);
        this.retryIntervalMs = conf.getInt(RECOVERY_RETRY_INTERVAL_MILLIS, DEFAULT_RECOVERY_RETRY_INTERVAL_MILLIS);
        this.keepAliveTimeMs = conf.getInt(KEEPALIVE_MILLIS, DEFAULT_KEEPALIVE_MILLIS);
        LOG.debug("Connect {} to {} with session timeout={}ms, retries {}, " + "retry interval {}ms, keepAlive={}ms", getId(), connectString,
                sessionTimeoutMs, maxRetries, retryIntervalMs, keepAliveTimeMs);
        Threads.setDaemonThreadRunning(new Thread(this::run), "ReadOnlyZKClient-" + connectString + "@" + getId());
    }

    private abstract class ZKTask<T> extends Task {

        protected final String path;

        private final CompletableFuture<T> future;

        private final String operationType;

        private int retries;

        protected ZKTask(String path, CompletableFuture<T> future, String operationType) {
            this.path = path;
            this.future = future;
            this.operationType = operationType;
        }

        protected final void onComplete(ZooKeeper zk, int rc, T ret, boolean errorIfNoNode) {
            tasks.add(new Task() {

                @Override
                public void exec(ZooKeeper alwaysNull) {
                    pendingRequests--;
                    Code code = Code.get(rc);
                    if(code == Code.OK) {
                        future.complete(ret);
                    } else if(code == Code.NONODE) {
                        if(errorIfNoNode) {
                            future.completeExceptionally(KeeperException.create(code, path));
                        } else {
                            future.complete(ret);
                        }
                    } else if(FAIL_FAST_CODES.contains(code)) {
                        future.completeExceptionally(KeeperException.create(code, path));
                    } else {
                        if(code == Code.SESSIONEXPIRED) {
                            LOG.warn("{} to {} session expired, close and reconnect", getId(), connectString);
                            try {
                                zk.close();
                            } catch(InterruptedException e) {
                            }
                        }
                        if(ZKTask.this.delay(retryIntervalMs, maxRetries)) {
                            LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}", getId(), connectString, operationType, path, code,
                                    ZKTask.this.retries);
                            tasks.add(ZKTask.this);
                        } else {
                            LOG.warn("{} to {} failed for {} of {}, code = {}, retries = {}, give up", getId(), connectString, operationType, path,
                                    code, ZKTask.this.retries);
                            future.completeExceptionally(KeeperException.create(code, path));
                        }
                    }
                }

                @Override
                public void closed(IOException e) {
                    // It may happen that a request is succeeded and the onComplete has been called and pushed
                    // us into the task queue, but before we get called a close is called and here we will
                    // fail the request, although it is succeeded actually.
                    // This is not a perfect solution but anyway, it is better than hang the requests for
                    // ever, and also acceptable as if you close the zk client before actually getting the
                    // response then a failure is always possible.
                    future.completeExceptionally(e);
                }
            });
        }

        @Override
        public boolean needZk() {
            return true;
        }

        protected abstract void doExec(ZooKeeper zk);

        @Override
        public final void exec(ZooKeeper zk) {
            pendingRequests++;

            /********
             * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
             *   注释：
             */
            doExec(zk);
        }

        public boolean delay(long intervalMs, int maxRetries) {
            if(retries >= maxRetries) {
                return false;
            }
            retries++;
            time = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(intervalMs);
            return true;
        }

        @Override
        public void connectFailed(IOException e) {
            if(delay(retryIntervalMs, maxRetries)) {
                LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}", getId(), connectString, operationType, path, retries, e);
                tasks.add(this);
            } else {
                LOG.warn("{} to {} failed to connect to zk fo {} of {}, retries = {}, give up", getId(), connectString, operationType, path, retries,
                        e);
                future.completeExceptionally(e);
            }
        }

        @Override
        public void closed(IOException e) {
            future.completeExceptionally(e);
        }
    }

    public CompletableFuture<byte[]> get(String path) {
        if(closed.get()) {
            return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
        }
        CompletableFuture<byte[]> future = new CompletableFuture<>();

        /********
         * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
         *   注释： ZKTask 加入 tasks 队列，去寻找  tasks.poll 代码，
         *   会看到调用 zkTasks.run() 代码执行，里面执行 doExec(ZooKeeper zk) 方法
         */
        tasks.add(new ZKTask<byte[]>(path, future, "get") {

            @Override
            protected void doExec(ZooKeeper zk) {

                /********
                 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                 *   注释： 终于看到 meta region 的位置是从 zk 获取的
                 *   path = /hbase/meta-region-server/  meta表的唯一的region的位置信息
                 */
                zk.getData(path, false, (rc, path, ctx, data, stat) -> onComplete(zk, rc, data, true), null);
            }
        });
        return future;
    }

    public CompletableFuture<Stat> exists(String path) {
        if(closed.get()) {
            return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
        }
        CompletableFuture<Stat> future = new CompletableFuture<>();
        tasks.add(new ZKTask<Stat>(path, future, "exists") {

            @Override
            protected void doExec(ZooKeeper zk) {
                zk.exists(path, false, (rc, path, ctx, stat) -> onComplete(zk, rc, stat, false), null);
            }
        });
        return future;
    }

    public CompletableFuture<List<String>> list(String path) {
        if(closed.get()) {
            return FutureUtils.failedFuture(new DoNotRetryIOException("Client already closed"));
        }
        CompletableFuture<List<String>> future = new CompletableFuture<>();
        tasks.add(new ZKTask<List<String>>(path, future, "list") {

            @Override
            protected void doExec(ZooKeeper zk) {
                zk.getChildren(path, false, (rc, path, ctx, children) -> onComplete(zk, rc, children, true), null);
            }
        });
        return future;
    }

    private void closeZk() {
        if(zookeeper != null) {
            try {
                zookeeper.close();
            } catch(InterruptedException e) {
            }
            zookeeper = null;
        }
    }

    private ZooKeeper getZk() throws IOException {
        // may be closed when session expired
        if(zookeeper == null || !zookeeper.getState().isAlive()) {
            zookeeper = new ZooKeeper(connectString, sessionTimeoutMs, e -> {
            });
        }
        return zookeeper;
    }

    private void run() {
        for(; ; ) {
            Task task;
            try {

                /********
                 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                 *   注释： 取出任务
                 */
                task = tasks.poll(keepAliveTimeMs, TimeUnit.MILLISECONDS);
            } catch(InterruptedException e) {
                continue;
            }
            if(task == CLOSE) {
                break;
            }
            if(task == null) {
                if(pendingRequests == 0) {
                    LOG.trace("{} to {} inactive for {}ms; closing (Will reconnect when new requests)", getId(), connectString, keepAliveTimeMs);
                    closeZk();
                }
                continue;
            }
            if(!task.needZk()) {

                /********
                 * TODO_MA 马中华 https://blog.csdn.net/zhongqi2513
                 *   注释： 执行
                 */
                task.exec(null);
            } else {
                ZooKeeper zk;
                try {
                    zk = getZk();
                } catch(IOException e) {
                    task.connectFailed(e);
                    continue;
                }
                task.exec(zk);
            }
        }
        closeZk();
        DoNotRetryIOException error = new DoNotRetryIOException("Client already closed");
        Arrays.stream(tasks.toArray(new Task[0])).forEach(t -> t.closed(error));
        tasks.clear();
    }

    @Override
    public void close() {
        if(closed.compareAndSet(false, true)) {
            LOG.debug("Close zookeeper connection {} to {}", getId(), connectString);
            tasks.add(CLOSE);
        }
    }

    @VisibleForTesting
    public String getConnectString() {
        return connectString;
    }
}
